Java Concurrent - DelayQueue

Foreword

SimpleDelayQueue

DelayQueue的基本思想很简单:

  • 内部容器为一个优先队列, 优先条件为出队列时间. 即应该出队列的时间越早越优先.
  • 通过Condition#awaitNanos来实现时间上的延迟. 举例, 假设队列中只有一个元素, 需要延迟1秒. 那么试图获取此元素的线程就需要await(1, TimeUnit.SECONDS)

DelayQueue的成员非常少, 都列在下方. 其中的leader可以暂时忽略.

1
2
3
4
5
6
7
private transient final ReentrantLock lock = new ReentrantLock();

private final PriorityQueue<E> q = new PriorityQueue<E>();

private Thread leader = null;

private final Condition available = lock.newCondition();

值得注意的一点是, 虽然DelayQueueBlockingQueue的实现, 但是DelayQueue的所有插入方法都不会阻塞, put方法也只是调用了offer而已:

Inserts the specified element into this delay queue. As the queue is unbounded this method will never block.

如果不考虑leader, 可以仅仅基于lock, awailable,q实现一个更加简单的延迟队列. 我们只实现两个最核心的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
/**
* A delay queue without leader
* @author shibinfei
*
* @param <E>
*/
public class SimpleDelayQueue<E extends Delayed> {

private final ReentrantLock lock = new ReentrantLock();

private final PriorityQueue<E> q = new PriorityQueue<E>();

private final Condition available = lock.newCondition();

public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);

if (q.peek() == e) {
available.signal();
}
return true;
} finally {
lock.unlock();
}
}

public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null) {
available.await();
} else {
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay <= 0) {
return q.poll();
} else {
available.awaitNanos(delay);
}
}
}
} finally {
if (q.peek() != null) {
available.signal();
}
lock.unlock();
}
}

}

offer

offer主体可以说是非常简单了:

  • 向优先队列中插入元素
  • 如果刚刚插入的元素为队列头部. 那么则对其他线程进行signal. 可以猜想在典型的生产线程/消费线程情况下, 没有拿到元素的线程应该是在await状态, 所以signal唤醒其中一个消费线程是必要的. 而如果元素不是队列的head, 那么通常表示此元素还不需要出队列, 没有必要白白唤醒一个线程. (TODO)

take

take的逻辑可以简单看一眼. 如果感觉不清晰没关系, 后面的举例更明了. 也可以参考流程图:

举个栗子

假设实现一种延迟指定秒的Delay, new FixedSecondsDelay(5)表示延迟5秒. 如果对实现感兴趣可以看如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* 一种 {@code Delayed} 实现: 在指定秒之后可以出队列
* @author shibinfei
*
*/
public class FixedSecondsDelay implements Delayed {

// 可以出队列的时间
private Date outTime;

public Date getOutTime() {
return outTime;
}

public FixedSecondsDelay(long secondsToBeReady) {
this.outTime = new Date(System.currentTimeMillis() + secondsToBeReady * 1000);
}

@Override
public int compareTo(Delayed o) {
return outTime.compareTo(((FixedSecondsDelay)o).getOutTime());
}

@Override
public long getDelay(TimeUnit unit) {
return unit.convert(outTime.getTime() - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}

}

调用代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
DelayQueue<FixedSecondsDelay> delayQueue = new DelayQueue<>();

for (int i = 0; i < 2; i++) {
new Thread(() -> {
try {
delayQueue.take(); // wait if elements can't be polled
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "Thread" + i).start();
}

TimeUnit.SECONDS.sleep(2); // 保证前两个线程先执行
delayQueue.put(new FixedSecondsDelay(2)); // can be polled after 2 seconds

整个过程中涉及到了三个线程, 分别为循环中创建的消费线程Thread0, Thread1和主线程Main. 接下来对整个流程进行流水账说明:

1. `Thread0`获取锁之后, 马上进入了`await`, 而此操作释放了锁, 则`Thread1`进入, 同样进入`await`状态.
2. 接下来`Main`线程, 放入了一个两秒后可以出队列的元素,   此时只有一个元素, 故亦为队列头元素. 所以会`signal`一个等待线程, 假设为`Thread1`
3. 被唤醒的`Thread1` 进入了下次循环, 而此时头部元素不为空, 但是还没到时间, 仍需2秒才能出队列, 故`Thread1`开始`awaitNanos(delay)` (2秒) , 结束`awaitNanos`之后, 下次循环即可取出元素. 随机返回.

制造一个更加复杂一点的场景, delayQueue.put(new FixedSecondsDelay(2));之后再追加一个delayQueue.put(new FixedSecondsDelay(5));. 那么和上方, 第1, 2步是相同的. 第3步有一些差别:

3. 5秒出队列的元素, 由于不是队列头, 所以并没有去唤醒任一线程. 但是当`Thread1`结束之后, 会去`signal`等待的`Thread0`来继续处理此元素.  

Leader - follower

第一印象

Thread designated to wait for the element at the head of the queue. This variant of the Leader-Follower pattern (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to minimize unnecessary timed waiting. When a thread becomes the leader, it waits only for the next delay to elapse, but other threads await indefinitely.

这是关于leader的前半部分注释. 实际上”timed_waiting”以及”waits only for the next delay to elapse”的实际行为都是awaitNanos. 相应的, “await indefinitely”即await. 加深一下印象:

- leader表示等待处理队列head元素的线程
- 减少不必要的`awaitNanos`.

总体来讲, leader的作用就是尽量让线程处于await状态, 而不是await. 可想而知这基于一个前提: awaitNanos会带来不必要的性能消耗.

如何减少awaitNanos

对比DelayQueueSimpleDelayQueue. 前者多了这些判断

  • offer 中如果为头元素, 会将leader设置为null
  • take中判断leader != null, 会进入await. 并且awaitNanos前后分别会占用/放弃leader位置.

当面临如下场景时:

1
2
3
4
5
6
7
8
9
10
11
12
13
DelayQueue<Delayed> delayQueue = new DelayQueue<>();
Delayed delayed = new FixedSecondsDelay(10); // 10 seconds to be ready
delayQueue.offer(delayed);

for (int i = 0; i < 2; i++) {
new Thread(() -> {
try {
delayQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}

假设两个线程为Thread0, Thread1. 其中一个先获取锁之后, 占用了leader, 开始awaitNanos. 而另一个线程判断当前leader不为空, 则进入了await状态. 相比之下SimpleDelayQueue在这种场景中, 两个线程都在awaitNanos.

Leader Summary

  • 在处理队列head的线程才能是leader. 如果队列head更换, 那么对应线程的leader位置就不保了.
  • 一个线程成为leader之后, 它awaitNanos, 其他线程await(这句话在上面的case适用, 但是不是所有场景都如此)
  • leader拿完元素走人之后, 要signal其他线程(follower).

TODO // md 感觉说的不够清楚

See Also